package org.example.dobs.demo.flink.wc.source;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.StringTokenizer;

/**
 * 基于文件
 */
public class WC_Collection {
    public static void main(String[] args) throws Exception {
        //step1 environment of flink
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        //step2 input
        ArrayList<String> stringArrayList = new ArrayList<>();
        stringArrayList.add("share");
        stringArrayList.add("it");
        stringArrayList.add("clone");
        stringArrayList.add("it");
        stringArrayList.add("lock");
        stringArrayList.add("it");
        DataStreamSource<String> inputDataStream = see.fromCollection(stringArrayList);
        //step3 data processing. flatMap keyBy
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                StringTokenizer stringTokenizer = new StringTokenizer(line);
                while (stringTokenizer.hasMoreTokens()) {
                    String token = stringTokenizer.nextToken();
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
        // step4 output
        result.print().setParallelism(1);
        // finally run or execute flink job
        see.execute("WordCount_Job");
    }
}
